summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-12 16:54:19 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-12 16:56:27 +0100
commit2b971a3ee6f5506994f6f7c45f73d1b0f23aa57d (patch)
tree5b753fda6b70e20e04e5627730bdcb32260d2805 /src/orchestrator/orchestrator.rs
parent8e1af49d76739c802e87b41f3fa36472b2fb64b5 (diff)
Move log receiving to dedicated type
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs85
1 files changed, 55 insertions, 30 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 9c8ca9b..eb7f608 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -8,6 +8,9 @@ use anyhow::Result;
use anyhow::anyhow;
use diesel::PgConnection;
use typed_builder::TypedBuilder;
+use uuid::Uuid;
+use tokio::sync::mpsc::UnboundedReceiver;
+use indicatif::ProgressBar;
use crate::db::models::Submit;
use crate::endpoint::EndpointConfiguration;
@@ -69,7 +72,7 @@ impl Orchestrator {
let mut report_result = vec![];
let number_of_jobsets = self.jobsets.len();
- let _database = self.database;
+ let database = Arc::new(self.database);
for (i, jobset) in self.jobsets.into_iter().enumerate() {
// create a multi-bar for showing the overall jobset status as well as one bar per
@@ -118,37 +121,16 @@ impl Orchestrator {
{
let log_processing_results = futures::stream::FuturesUnordered::new();
- for (job_id, mut log) in logs {
+ for (job_id, log) in logs {
let bar = jobset_bar.add(self.progress_generator.job_bar(&job_id));
+ let db = database.clone();
log_processing_results.push(async move {
- let mut success = None;
- while let Some(logitem) = log.recv().await {
- match logitem {
- LogItem::Line(_) => {
- // ignore
- },
- LogItem::Progress(u) => {
- bar.set_position(u as u64);
- },
- LogItem::CurrentPhase(phasename) => {
- bar.set_message(&format!("{} Phase: {}", job_id, phasename));
- },
- LogItem::State(Ok(s)) => {
- bar.set_message(&format!("{} State Ok: {}", job_id, s));
- success = Some(true);
- },
- LogItem::State(Err(e)) => {
- bar.set_message(&format!("{} State Err: {}", job_id, e));
- success = Some(false);
- },
- }
- }
-
- match success {
- Some(true) => bar.finish_with_message(&format!("{} finished successfully", job_id)),
- Some(false) => bar.finish_with_message(&format!("{} finished with error", job_id)),
- None => bar.finish_with_message(&format!("{} finished", job_id)),
- }
+ LogReceiver {
+ job_id,
+ log,
+ bar,
+ db,
+ }.join()
});
}
@@ -188,3 +170,46 @@ impl Orchestrator {
}
}
+
+struct LogReceiver {
+ job_id: Uuid,
+ log: UnboundedReceiver<LogItem>,
+ bar: ProgressBar,
+ db: Arc<PgConnection>,
+}
+
+impl LogReceiver {
+ async fn join(mut self) -> Result<()> {
+ let mut success = None;
+ while let Some(logitem) = self.log.recv().await {
+ match logitem {
+ LogItem::Line(_) => {
+ // ignore
+ },
+ LogItem::Progress(u) => {
+ self.bar.set_position(u as u64);
+ },
+ LogItem::CurrentPhase(phasename) => {
+ self.bar.set_message(&format!("{} Phase: {}", self.job_id, phasename));
+ },
+ LogItem::State(Ok(s)) => {
+ self.bar.set_message(&format!("{} State Ok: {}", self.job_id, s));
+ success = Some(true);
+ },
+ LogItem::State(Err(e)) => {
+ self.bar.set_message(&format!("{} State Err: {}", self.job_id, e));
+ success = Some(false);
+ },
+ }
+ }
+
+ match success {
+ Some(true) => self.bar.finish_with_message(&format!("{} finished successfully", self.job_id)),
+ Some(false) => self.bar.finish_with_message(&format!("{} finished with error", self.job_id)),
+ None => self.bar.finish_with_message(&format!("{} finished", self.job_id)),
+ }
+
+ Ok(())
+ }
+}
+