summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/orchestrator/orchestrator.rs19
1 files changed, 11 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index d57324b..42e3b63 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -199,10 +199,10 @@ impl<'a> OrchestratorSetup<'a> {
///
/// Represents a result that came from the run of a job inside a container
///
-/// It is either a list of artifacts (with their respective database artifact objects)
+/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>;
+type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>;
impl<'a> Orchestrator<'a> {
pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
@@ -323,7 +323,10 @@ impl<'a> Orchestrator<'a> {
let _ = jobs_result?;
match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
- Some(Ok((_, artifacts))) => Ok((artifacts, vec![])),
+ Some(Ok(results)) => {
+ let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
+ Ok((results, vec![]))
+ },
Some(Err(errors)) => Ok((vec![], errors)),
}
}
@@ -486,11 +489,11 @@ impl<'a> JobTask<'a> {
// if the scheduler run reports success,
// it returns the database artifact objects it created!
- Ok(mut artifacts) => {
+ Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts);
- artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten());
+ received_dependencies.push((self.uuid, artifacts));
self.sender
- .send(Ok((self.uuid, artifacts)))
+ .send(Ok(received_dependencies))
.await?;
},
}
@@ -508,11 +511,11 @@ impl<'a> JobTask<'a> {
/// Return Ok(false) if the channel is empty and we're done receiving
async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
match self.receiver.recv().await {
- Some(Ok(v)) => {
+ Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
// (uuid of the job, [Artifact])
trace!("[{}]: Received: {:?}", self.uuid, v);
- received_dependencies.push(v);
+ received_dependencies.append(&mut v);
Ok(true)
},
Some(Err(mut e)) => {