From 2b971a3ee6f5506994f6f7c45f73d1b0f23aa57d Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 12 Nov 2020 16:54:19 +0100 Subject: Move log receiving to dedicated type Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 85 ++++++++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 30 deletions(-) (limited to 'src/orchestrator/orchestrator.rs') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 9c8ca9b..eb7f608 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -8,6 +8,9 @@ use anyhow::Result; use anyhow::anyhow; use diesel::PgConnection; use typed_builder::TypedBuilder; +use uuid::Uuid; +use tokio::sync::mpsc::UnboundedReceiver; +use indicatif::ProgressBar; use crate::db::models::Submit; use crate::endpoint::EndpointConfiguration; @@ -69,7 +72,7 @@ impl Orchestrator { let mut report_result = vec![]; let number_of_jobsets = self.jobsets.len(); - let _database = self.database; + let database = Arc::new(self.database); for (i, jobset) in self.jobsets.into_iter().enumerate() { // create a multi-bar for showing the overall jobset status as well as one bar per @@ -118,37 +121,16 @@ impl Orchestrator { { let log_processing_results = futures::stream::FuturesUnordered::new(); - for (job_id, mut log) in logs { + for (job_id, log) in logs { let bar = jobset_bar.add(self.progress_generator.job_bar(&job_id)); + let db = database.clone(); log_processing_results.push(async move { - let mut success = None; - while let Some(logitem) = log.recv().await { - match logitem { - LogItem::Line(_) => { - // ignore - }, - LogItem::Progress(u) => { - bar.set_position(u as u64); - }, - LogItem::CurrentPhase(phasename) => { - bar.set_message(&format!("{} Phase: {}", job_id, phasename)); - }, - LogItem::State(Ok(s)) => { - bar.set_message(&format!("{} State Ok: {}", job_id, s)); - success = Some(true); - }, - LogItem::State(Err(e)) => { - bar.set_message(&format!("{} State Err: {}", job_id, e)); - success = Some(false); - }, - } - } - - match success { - Some(true) => bar.finish_with_message(&format!("{} finished successfully", job_id)), - Some(false) => bar.finish_with_message(&format!("{} finished with error", job_id)), - None => bar.finish_with_message(&format!("{} finished", job_id)), - } + LogReceiver { + job_id, + log, + bar, + db, + }.join() }); } @@ -188,3 +170,46 @@ impl Orchestrator { } } + +struct LogReceiver { + job_id: Uuid, + log: UnboundedReceiver, + bar: ProgressBar, + db: Arc, +} + +impl LogReceiver { + async fn join(mut self) -> Result<()> { + let mut success = None; + while let Some(logitem) = self.log.recv().await { + match logitem { + LogItem::Line(_) => { + // ignore + }, + LogItem::Progress(u) => { + self.bar.set_position(u as u64); + }, + LogItem::CurrentPhase(phasename) => { + self.bar.set_message(&format!("{} Phase: {}", self.job_id, phasename)); + }, + LogItem::State(Ok(s)) => { + self.bar.set_message(&format!("{} State Ok: {}", self.job_id, s)); + success = Some(true); + }, + LogItem::State(Err(e)) => { + self.bar.set_message(&format!("{} State Err: {}", self.job_id, e)); + success = Some(false); + }, + } + } + + match success { + Some(true) => self.bar.finish_with_message(&format!("{} finished successfully", self.job_id)), + Some(false) => self.bar.finish_with_message(&format!("{} finished with error", self.job_id)), + None => self.bar.finish_with_message(&format!("{} finished", self.job_id)), + } + + Ok(()) + } +} + -- cgit v1.2.3