From b5d081eece72841728d4ccc4a325f390a325e699 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 7 Nov 2020 17:21:18 +0100 Subject: Implement progress bars in orchestrator Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 76 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 6 deletions(-) (limited to 'src/orchestrator') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 04acd4b..0132e08 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -26,8 +26,10 @@ use crate::db::models::Submit; use crate::db::models::EnvVar; use crate::job::JobResource; use crate::filestore::MergedStores; +use crate::util::progress::ProgressBars; pub struct Orchestrator { + progress_generator: ProgressBars, scheduler: EndpointScheduler, staging_store: Arc>, release_store: Arc>, @@ -38,6 +40,7 @@ pub struct Orchestrator { #[derive(TypedBuilder)] pub struct OrchestratorSetup { + progress_generator: ProgressBars, endpoint_config: Vec, staging_store: Arc>, release_store: Arc>, @@ -52,6 +55,7 @@ impl OrchestratorSetup { let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone()).await?; Ok(Orchestrator { + progress_generator: self.progress_generator, scheduler: scheduler, staging_store: self.staging_store, release_store: self.release_store, @@ -67,8 +71,19 @@ impl Orchestrator { pub async fn run(self) -> Result<()> { use tokio::stream::StreamExt; + let number_of_jobsets = self.jobsets.len(); let _database = self.database; - for jobset in self.jobsets.into_iter() { + + for (i, jobset) in self.jobsets.into_iter().enumerate() { + // create a multi-bar for showing the overall jobset status as well as one bar per + // running job. + let jobset_bar = indicatif::MultiProgress::default(); + + // Create a "overview bar", which shows the progress of all jobs of the jobset combined + let jobset_overview_bar = jobset_bar.add({ + self.progress_generator.jobset_bar(i + 1, number_of_jobsets, jobset.len()) + }); + let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); let (results, logs) = { // run the jobs in the set @@ -76,31 +91,80 @@ impl Orchestrator { let unordered_receivers = futures::stream::FuturesUnordered::new(); for runnable in jobset.into_runables(&merged_store) { let runnable = runnable?; - trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name()); + let job_id = runnable.uuid().clone(); + trace!("Runnable {} for package {}", job_id, runnable.package().name()); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); let jobhandle = self.scheduler.schedule_job(runnable, sender).await?; trace!("Jobhandle -> {:?}", jobhandle); + + // clone the bar here, so we can give a handle to the async result fetcher closure + // where we tick() it as soon as the job returns the result (= is finished) + let bar = jobset_overview_bar.clone(); + unordered_results.push(async move { - jobhandle.get_result() - .await + let r = jobhandle.get_result().await; + bar.tick(); + r }); unordered_receivers.push(async move { - receiver + (job_id, receiver) }); } (unordered_results.collect::>>(), unordered_receivers.collect::>()) }; - let (results, _logs) = tokio::join!(results, logs); + let (results, logs) = tokio::join!(results, logs); // TODO: Use logs. + { + let log_processing_results = futures::stream::FuturesUnordered::new(); + for (job_id, mut log) in logs { + let bar = jobset_bar.add(self.progress_generator.job_bar(&job_id)); + log_processing_results.push(async move { + let mut success = None; + while let Some(logitem) = log.recv().await { + match logitem { + LogItem::Line(_) => { + // ignore + }, + LogItem::Progress(u) => { + bar.set_position(u as u64); + }, + LogItem::CurrentPhase(phasename) => { + bar.set_message(&format!("{} Phase: {}", job_id, phasename)); + }, + LogItem::State(Ok(s)) => { + bar.set_message(&format!("{} State Ok: {}", job_id, s)); + success = Some(true); + }, + LogItem::State(Err(e)) => { + bar.set_message(&format!("{} State Err: {}", job_id, e)); + success = Some(false); + }, + } + } + + match success { + Some(true) => bar.finish_with_message(&format!("{} finished successfully", job_id)), + Some(false) => bar.finish_with_message(&format!("{} finished with error", job_id)), + None => bar.finish_with_message(&format!("{} finished", job_id)), + } + }); + } + + let _ = log_processing_results.collect::>().await; + } + let results = results? .into_iter() .flatten() .collect::>(); + let _ = jobset_overview_bar.finish(); + let _ = jobset_bar.join()?; + { // check if all paths that were written are actually there in the staging store let staging_store_lock = self.staging_store .read() -- cgit v1.2.3