diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-06 16:25:09 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-07 14:53:04 +0100 |
commit | e648c06808296eb916bc65f4c6e8e3234c29caae (patch) | |
tree | 57e7f8bbdac6a18d278ea47483658becb73bd976 /src/orchestrator | |
parent | 72a00593c4c8348804fbd9d4fac025ffe15b093a (diff) |
Temp Fix: Collect log receivers
This is another temporary fix to get everything working.
The issue was, that the receivers were immediately dropped and did not
life long enough so that the sending side could actually write to the
channels.
This keeps the receivers alive.
The real fix is to actually write the logs to some destination.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index a2eb7e5..2005f6b 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -71,8 +71,9 @@ impl Orchestrator { for jobset in self.jobsets.into_iter() { let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); - let results = { // run the jobs in the set - let unordered = futures::stream::FuturesUnordered::new(); + let (results, logs) = { // run the jobs in the set + let unordered_results = futures::stream::FuturesUnordered::new(); + let unordered_receivers = futures::stream::FuturesUnordered::new(); for runnable in jobset.into_runables(&merged_store) { let runnable = runnable?; trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name()); @@ -80,17 +81,26 @@ impl Orchestrator { let jobhandle = self.scheduler.schedule_job(runnable, sender).await?; trace!("Jobhandle -> {:?}", jobhandle); - unordered.push(async move { - jobhandle.get_result().await + unordered_results.push(async move { + jobhandle.get_result() + .await + }); + unordered_receivers.push(async move { + receiver }); } - unordered.collect::<Result<Vec<_>>>().await? - .into_iter() - .flatten() - .collect::<Vec<PathBuf>>() + (unordered_results.collect::<Result<Vec<_>>>(), unordered_receivers.collect::<Vec<_>>()) }; + let (results, _logs) = tokio::join!(results, logs); + // TODO: Use logs. + + let results = results? + .into_iter() + .flatten() + .collect::<Vec<PathBuf>>(); + { // check if all paths that were written are actually there in the staging store let staging_store_lock = self.staging_store .read() |