summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-06 16:25:09 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-07 14:53:04 +0100
commite648c06808296eb916bc65f4c6e8e3234c29caae (patch)
tree57e7f8bbdac6a18d278ea47483658becb73bd976 /src/orchestrator
parent72a00593c4c8348804fbd9d4fac025ffe15b093a (diff)
Temp Fix: Collect log receivers
This is another temporary fix to get everything working. The issue was, that the receivers were immediately dropped and did not life long enough so that the sending side could actually write to the channels. This keeps the receivers alive. The real fix is to actually write the logs to some destination. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs26
1 files changed, 18 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index a2eb7e5..2005f6b 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -71,8 +71,9 @@ impl Orchestrator {
for jobset in self.jobsets.into_iter() {
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
- let results = { // run the jobs in the set
- let unordered = futures::stream::FuturesUnordered::new();
+ let (results, logs) = { // run the jobs in the set
+ let unordered_results = futures::stream::FuturesUnordered::new();
+ let unordered_receivers = futures::stream::FuturesUnordered::new();
for runnable in jobset.into_runables(&merged_store) {
let runnable = runnable?;
trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name());
@@ -80,17 +81,26 @@ impl Orchestrator {
let jobhandle = self.scheduler.schedule_job(runnable, sender).await?;
trace!("Jobhandle -> {:?}", jobhandle);
- unordered.push(async move {
- jobhandle.get_result().await
+ unordered_results.push(async move {
+ jobhandle.get_result()
+ .await
+ });
+ unordered_receivers.push(async move {
+ receiver
});
}
- unordered.collect::<Result<Vec<_>>>().await?
- .into_iter()
- .flatten()
- .collect::<Vec<PathBuf>>()
+ (unordered_results.collect::<Result<Vec<_>>>(), unordered_receivers.collect::<Vec<_>>())
};
+ let (results, _logs) = tokio::join!(results, logs);
+ // TODO: Use logs.
+
+ let results = results?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<PathBuf>>();
+
{ // check if all paths that were written are actually there in the staging store
let staging_store_lock = self.staging_store
.read()