diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index d57324b..42e3b63 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -199,10 +199,10 @@ impl<'a> OrchestratorSetup<'a> { /// /// Represents a result that came from the run of a job inside a container /// -/// It is either a list of artifacts (with their respective database artifact objects) +/// It is either a list of artifacts with the UUID of the job they were produced by, /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>; +type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>; impl<'a> Orchestrator<'a> { pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> { @@ -323,7 +323,10 @@ impl<'a> Orchestrator<'a> { let _ = jobs_result?; match root_receiver.recv().await { None => Err(anyhow!("No result received...")), - Some(Ok((_, artifacts))) => Ok((artifacts, vec![])), + Some(Ok(results)) => { + let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect(); + Ok((results, vec![])) + }, Some(Err(errors)) => Ok((vec![], errors)), } } @@ -486,11 +489,11 @@ impl<'a> JobTask<'a> { // if the scheduler run reports success, // it returns the database artifact objects it created! - Ok(mut artifacts) => { + Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts); - artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten()); + received_dependencies.push((self.uuid, artifacts)); self.sender - .send(Ok((self.uuid, artifacts))) + .send(Ok(received_dependencies)) .await?; }, } @@ -508,11 +511,11 @@ impl<'a> JobTask<'a> { /// Return Ok(false) if the channel is empty and we're done receiving async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { match self.receiver.recv().await { - Some(Ok(v)) => { + Some(Ok(mut v)) => { // The task we depend on succeeded and returned an // (uuid of the job, [Artifact]) trace!("[{}]: Received: {:?}", self.uuid, v); - received_dependencies.push(v); + received_dependencies.append(&mut v); Ok(true) }, Some(Err(mut e)) => { |